use std::{collections::BTreeMap, sync::Arc};

use flume::{Receiver, Sender};

use crate::{
    fs::{manager::StoreManager, FileId},
    ondisk::sstable::SsTableID,
    version::timestamp::Timestamp,
    DbOption,
};

pub enum CleanTag {
    Add { ts: Timestamp, gens: Vec<SsTableID> },
    Clean { ts: Timestamp },
    RecoverClean { wal_id: FileId, level: usize },
}

pub(crate) struct Cleaner {
    tag_recv: Receiver<CleanTag>,
    gens_map: BTreeMap<Timestamp, (Vec<SsTableID>, bool)>,
    option: Arc<DbOption>,
    manager: Arc<StoreManager>,
}

impl Cleaner {
    pub(crate) fn new(
        option: Arc<DbOption>,
        manager: Arc<StoreManager>,
    ) -> (Self, Sender<CleanTag>) {
        let (tag_send, tag_recv) = flume::bounded(option.clean_channel_buffer);

        (
            Cleaner {
                tag_recv,
                gens_map: Default::default(),
                option,
                manager,
            },
            tag_send,
        )
    }

    pub(crate) async fn listen(&mut self) -> Result<(), fusio::Error> {
        while let Ok(tag) = self.tag_recv.recv_async().await {
            match tag {
                CleanTag::Add { ts, gens } => {
                    let _ = self.gens_map.insert(ts, (gens, false));
                }
                CleanTag::Clean { ts: version_num } => {
                    if let Some((_, dropped)) = self.gens_map.get_mut(&version_num) {
                        *dropped = true;
                    }
                    while let Some((first_version, (gens, dropped))) = self.gens_map.pop_first() {
                        if !dropped {
                            let _ = self.gens_map.insert(first_version, (gens, false));
                            break;
                        }
                        for gen in gens {
                            let fs = self
                                .option
                                .level_fs_path(gen.level())
                                .map(|path| self.manager.get_fs(path))
                                .unwrap_or(self.manager.base_fs());
                            fs.remove(&self.option.table_path(gen.file_id(), gen.level()))
                                .await?;
                        }
                    }
                }
                CleanTag::RecoverClean { wal_id: gen, level } => {
                    let fs = self
                        .option
                        .level_fs_path(level)
                        .map(|path| self.manager.get_fs(path))
                        .unwrap_or(self.manager.base_fs());
                    fs.remove(&self.option.table_path(gen, level)).await?;
                }
            }
        }

        Ok(())
    }
}

#[cfg(all(test, feature = "tokio"))]
pub(crate) mod tests {
    use std::{
        sync::Arc,
        time::{Duration, Instant},
    };

    use fusio::path::{path_to_local, Path};
    use fusio_dispatch::FsOptions;
    use tempfile::TempDir;
    use tokio::time::sleep;
    use tracing::error;

    use crate::{
        executor::{tokio::TokioExecutor, Executor},
        fs::{generate_file_id, manager::StoreManager, FileId, FileType},
        inmem::immutable::tests::TestSchema,
        ondisk::sstable::SsTableID,
        version::cleaner::{CleanTag, Cleaner},
        DbOption,
    };

    async fn wait_removed(option: &DbOption, gen: FileId, level: usize, timeout_ms: u64) {
        let deadline = Instant::now() + Duration::from_millis(timeout_ms);
        loop {
            let exists = path_to_local(&option.table_path(gen, level))
                .unwrap()
                .exists();
            if !exists {
                break;
            }
            if Instant::now() >= deadline {
                panic!(
                    "timeout waiting for removal: {}",
                    path_to_local(&option.table_path(gen, level))
                        .unwrap()
                        .display()
                );
            }
            sleep(Duration::from_millis(10)).await;
        }
    }

    #[tokio::test]
    async fn test_cleaner() {
        let temp_dir = TempDir::new().unwrap();
        let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap());
        let option = Arc::new(DbOption::new(
            Path::from_filesystem_path(temp_dir.path()).unwrap(),
            &TestSchema,
        ));

        let gen_0 = generate_file_id();
        let gen_1 = generate_file_id();
        let gen_2 = generate_file_id();
        let gen_3 = generate_file_id();
        let fs = option
            .level_fs_path(0)
            .map(|path| manager.get_fs(path))
            .unwrap_or(manager.base_fs());
        {
            fs.open_options(
                &option.table_path(gen_0, 0),
                FileType::Parquet.open_options(false),
            )
            .await
            .unwrap();
            fs.open_options(
                &option.table_path(gen_1, 0),
                FileType::Parquet.open_options(false),
            )
            .await
            .unwrap();
            fs.open_options(
                &option.table_path(gen_2, 0),
                FileType::Parquet.open_options(false),
            )
            .await
            .unwrap();
            fs.open_options(
                &option.table_path(gen_3, 0),
                FileType::Parquet.open_options(false),
            )
            .await
            .unwrap();
        }

        let (mut cleaner, tx) = Cleaner::new(option.clone(), manager.clone());

        let executor = TokioExecutor::default();

        executor.spawn(async move {
            if let Err(err) = cleaner.listen().await {
                error!("[Cleaner Error]: {}", err)
            }
        });

        tx.send_async(CleanTag::Add {
            ts: 1.into(),
            gens: vec![SsTableID::new(gen_1, 0)],
        })
        .await
        .unwrap();
        tx.send_async(CleanTag::Add {
            ts: 0.into(),
            gens: vec![SsTableID::new(gen_0, 0)],
        })
        .await
        .unwrap();
        tx.send_async(CleanTag::Add {
            ts: 2.into(),
            gens: vec![SsTableID::new(gen_2, 0)],
        })
        .await
        .unwrap();

        tx.send_async(CleanTag::Clean { ts: 2.into() })
            .await
            .unwrap();

        // FIXME
        assert!(path_to_local(&option.table_path(gen_0, 0))
            .unwrap()
            .exists());
        assert!(path_to_local(&option.table_path(gen_1, 0))
            .unwrap()
            .exists());
        assert!(path_to_local(&option.table_path(gen_2, 0))
            .unwrap()
            .exists());
        assert!(path_to_local(&option.table_path(gen_3, 0))
            .unwrap()
            .exists());

        tx.send_async(CleanTag::Clean { ts: 0.into() })
            .await
            .unwrap();
        // Wait for gen_0 to be removed deterministically.
        wait_removed(&option, gen_0, 0, 2_000).await;
        assert!(path_to_local(&option.table_path(gen_1, 0))
            .unwrap()
            .exists());
        assert!(path_to_local(&option.table_path(gen_2, 0))
            .unwrap()
            .exists());
        assert!(path_to_local(&option.table_path(gen_3, 0))
            .unwrap()
            .exists());

        tx.send_async(CleanTag::Clean { ts: 1.into() })
            .await
            .unwrap();
        // Wait for gen_1 and gen_2 to be removed deterministically.
        wait_removed(&option, gen_1, 0, 2_000).await;
        wait_removed(&option, gen_2, 0, 2_000).await;
        assert!(path_to_local(&option.table_path(gen_3, 0))
            .unwrap()
            .exists());

        tx.send_async(CleanTag::RecoverClean {
            wal_id: gen_3,
            level: 0,
        })
        .await
        .unwrap();
        // Wait for gen_3 to be removed deterministically.
        wait_removed(&option, gen_3, 0, 2_000).await;
    }
}
